package handler

import (
	"bufio"
	"context"
	"github.com/DiracLee/dires-go/logger"
	"github.com/DiracLee/dires-go/syncx"
	"io"
	"net"
	"sync"
	"sync/atomic"
	"time"
)

type EchoHandler struct {
	activeConn sync.Map // as an atomic interface
	closed     uint32
}

type EchoWorker struct {
	conn net.Conn
	wg   syncx.WaitGroup
}

const defaultWaitTimeout = 10 * time.Second

func (worker *EchoWorker) Close() error {
	worker.wg.WaitWithTimeout(defaultWaitTimeout)
	return worker.conn.Close()
}

func (h *EchoHandler) Handle(ctx context.Context, conn net.Conn) {
	if atomic.LoadUint32(&h.closed) != 0 {
		_ = conn.Close()
	}
	worker := &EchoWorker{
		conn: conn,
	}
	h.activeConn.Store(worker, struct{}{})

	reader := bufio.NewReader(conn)
	for {
		// read line by line
		msg, err := reader.ReadString('\n')
		if err != nil {
			if err == io.EOF {
				logger.Info("[EchoHandler::Handle] an echo connection closed")
				h.activeConn.Delete(worker)
			} else {
				logger.Errorf("[EchoHandler::Handle] echo read message error:", err)
			}
			return
		}
		worker.wg.Add(1)
		b := []byte(msg)
		_, _ = conn.Write(b)
		worker.wg.Done()
	}
}

func (h *EchoHandler) Close() error {
	logger.Info("[EchoHandler::Close] shut down echo handler ...")
	atomic.StoreUint32(&h.closed, 1)
	h.activeConn.Range(func(key interface{}, val interface{}) bool {
		worker := key.(*EchoWorker)
		_ = worker.Close()
		return true
	})
	return nil
}
